-
Notifications
You must be signed in to change notification settings - Fork 355
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1490][CIP-6] Introduce tier consumer for hybrid shuffle #2786
base: main
Are you sure you want to change the base?
Conversation
...common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
Show resolved
Hide resolved
...n/java/org/apache/celeborn/plugin/flink/network/TransportFrameDecoderWithBufferSupplier.java
Outdated
Show resolved
Hide resolved
...k/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
Outdated
Show resolved
Hide resolved
...k/common/src/main/java/org/apache/celeborn/plugin/flink/readclient/CelebornBufferStream.java
Show resolved
Hide resolved
a3497a2
to
1f28a9f
Compare
@@ -125,6 +125,7 @@ private io.netty.buffer.ByteBuf decodeBodyCopyOut( | |||
|
|||
ReadData readData = (ReadData) curMsg; | |||
long streamId = readData.getStreamId(); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any change?
subPartitionsNeedNotifyAvailable; | ||
|
||
@GuardedBy("lock") | ||
private boolean hasStart = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private boolean hasStart = false; | |
private boolean started = false; |
this.receivedBuffers = new HashMap<>(); | ||
this.subPartitionsNeedNotifyAvailable = new HashSet<>(); | ||
for (TierShuffleDescriptor shuffleDescriptor : shuffleDescriptors) { | ||
if (!(shuffleDescriptor instanceof TierShuffleDescriptorImpl)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!(shuffleDescriptor instanceof TierShuffleDescriptorImpl)) { | |
if (shuffleDescriptor instanceof TierShuffleDescriptorImpl) { | |
initShuffleClient((TierShuffleDescriptorImpl) shuffleDescriptor); | |
break; | |
} |
initShuffleClient((TierShuffleDescriptorImpl) shuffleDescriptor); | ||
break; | ||
} | ||
initBufferReaders(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If shuffleClient
is null, does this need to initialize buffer readers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if shuffleClient
is null, initializing buffer readers is unnecessary.
In the current implementation, the shuffleClient
should not be null. I will add a check for shuffleClient
.
@Override | ||
public void onFailure(Throwable e) { | ||
logger.error( | ||
"Send PbNotifyRequiredSegment to {} failed, detail {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add streamId in log.
boolean isBufferUsed = false; | ||
try { | ||
synchronized (bufferQueue) { | ||
checkState( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If isWaitingForFloatingBuffers
is false, should this return false instead of checking state?
requestMessage -> { | ||
// Note that we need to use SubPartitionReadData because the isSegmentGranularityVisible | ||
// is set | ||
// as true when opening stream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move line 111 after line 110.
} | ||
|
||
public void close() { | ||
// It may be call multiple times because subPartitions can share the same reader, as a single |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does subPartitionRequiredSegmentIds
need to clean?
|
||
try { | ||
if (bufferManager != null) { | ||
bufferManager.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bufferManager = null
.
this.subPartitionIndexEnd = endSubIdx; | ||
this.dataListener = dataListener; | ||
this.failureListener = failureListener; | ||
this.subPartitionRequiredSegmentIds = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.subPartitionRequiredSegmentIds = new ConcurrentHashMap<>(); | |
this.subPartitionRequiredSegmentIds = JavaUtils.newConcurrentHashMap<>(); |
this.subPartitionRequiredSegmentIds.put(subPartitionId, requiredSegmentId); | ||
if (!closed && !CelebornBufferStream.isEmptyStream(bufferStream)) { | ||
LOG.debug( | ||
"notifyRequiredSegmentId, shuffleId: {}, partitionId: {}, requiredSegmentId: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"notifyRequiredSegmentId, shuffleId: {}, partitionId: {}, requiredSegmentId: {}", | |
"Notify required segment id {} for {} {} {}.", requiredSegmentId, partitionId, subPartitionId, shuffleId |
...-1.20/src/main/java/org/apache/celeborn/plugin/flink/tiered/CelebornChannelBufferReader.java
Show resolved
Hide resolved
Co-authored-by: Xu Huang <[email protected]>
1f28a9f
to
b1e028b
Compare
@SteNicholas Thank you for your careful review. I have made modifications in this pull request based on your comments. PTAL. |
private boolean isWaitingForFloatingBuffers; | ||
|
||
/** The total number of required buffers for the respective input channel. */ | ||
private int numRequiredBuffers = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does numRequiredBuffers
need to use AtomicInteger
?
|
||
int tryRequestBuffersIfNeeded() { | ||
synchronized (bufferQueue) { | ||
if (numRequiredBuffers > 0 && !isWaitingForFloatingBuffers && bufferQueue.isEmpty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this not try request buffers when bufferQueue
is not empty?
|
||
public void setup(TieredStorageMemoryManager memoryManager) { | ||
this.bufferManager = new CelebornChannelBufferManager(memoryManager, this); | ||
if (numBackLog > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When setting up, why does this check numBackLog
? Is there any case that numBackLog
> 0?
bufferStream.open(this::requestBuffer, initialCredit, messageConsumer); | ||
this.isOpened = bufferStream.isOpened(); | ||
} catch (Exception e) { | ||
messageConsumer.accept(new TransportableError(0L, e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could add error log for exception.
checkState( | ||
readData.getSubPartitionId() >= subPartitionIndexStart | ||
&& readData.getSubPartitionId() <= subPartitionIndexEnd, | ||
"Wrong sub partition id"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add readData.getSubPartitionId()
in message.
|
||
// return the subPartitionId if already receive buffer from corresponding subpartition | ||
Map<TieredStorageSubpartitionId, Queue<Buffer>> subPartitionReceivedBuffers = | ||
receivedBuffers.get(tieredStoragePartitionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could receivedBuffers
use ConcurrentHashMap
? Meanwhile, the lock blocker only locks health check. WDYT?
} | ||
checkState( | ||
shuffleDescriptor.getResultPartitionID().equals(tieredStoragePartitionId.getPartitionID()), | ||
"Wrong result partition id."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add shuffleDescriptor.getResultPartitionID()
in message.
} | ||
Class<?> clazz = PartitionUnRetryAbleException.class; | ||
if (throwable.getMessage() != null && throwable.getMessage().contains(clazz.getName())) { | ||
cause = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also log throwable.
return; | ||
} | ||
TierShuffleDescriptorImpl shuffleDescriptor = (TierShuffleDescriptorImpl) tierShuffleDescriptor; | ||
if (shuffleClient == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is shuffleClient
null?
What changes were proposed in this pull request?
Introduce tier consumer for hybrid shuffle
Does this PR introduce any user-facing change?
No
How was this patch tested?
unit test